-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support transparent reconnects on the server #19
Conversation
I think the Travis failure may be because of a race (the same test passes for me locally)... taking a look. |
Ok, so the issue seemed to be that when the connection was lost, there could be some delay before the stream's I couldn't come up with any way to handle this except capturing the error during writing to the stream, handling it as a disconnect, and re-queuing the message. This appears to work, however it feels a bit dirty, and I'm concerned it could result in the messages getting out-of-order (if two are queued synchronously, then the first will trigger the disconnect and be re-queued behind the first). Solving this might involve draining the stream and re-queuing them all. I can look at this, but would like to know that this seems like a reasonable direction first (cc @grouma). |
lib/server/sse_handler.dart
Outdated
final Duration _keepAlive; | ||
|
||
/// Whether the connection is in the timeout period waiting for a reconnect. | ||
bool _isTimingOut = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of holding onto a boolean reference we can probably hold onto the underlying timer and cancel as necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done! However it's slightly weird in that after cancelling I also need to set it to null
so I can check for nulls elsewhere (otherwise we'd probably need a getter that wraps up checking that it is either null or cancelled - which would look a lot like this boolean - but let me know if you think that would be better).
lib/server/sse_handler.dart
Outdated
void _handleDisconnect() { | ||
if (_keepAlive == null) { | ||
_close(); | ||
} else if (!_isTimingOut) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic will likely be cleaned up if we only have reference to the current timer instead of indirectly making refrerence through the _isTimeOut member.
lib/server/sse_handler.dart
Outdated
/// Whether the connection is in the timeout period waiting for a reconnect. | ||
bool _isTimingOut = false; | ||
|
||
/// The subscription that passes messages outgoing messages to the sink. This |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment reads odd to me. Also new line after the first sentence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, there was an extra "messages" there. Fixed - let me know if it still doesn't look right.
lib/server/sse_handler.dart
Outdated
|
||
final _closedCompleter = Completer<void>(); | ||
|
||
SseConnection(this._sink) { | ||
_outgoingController.stream.listen((data) { | ||
SseConnection(this._sink, {Duration keepAlive}) : _keepAlive = keepAlive { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a comment indicating what keepAlive
means here and what a null
value implies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a comment to the constructor describing this (I presume it's what you had in mind - if not, let me know).
lib/server/sse_handler.dart
Outdated
// hasn't fired yet, so pause the subscription, re-queue the message | ||
// and handle the error as a disconnect. | ||
_handleDisconnect(); | ||
_outgoingController.add(data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative would be to use a StreamQueue
and use peak
/ next
to conditionally move forward. That should guarantee order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good - the order does concern me. However I'm not sure how this would work - without the stream, how would we fire this code?
I see there's a sync
argument in StreamController that could fix this (this code would fire on the first event, so there wouldn't be others to get out of order), though I don't know if it'd introduce any other side-effects?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking something like this:
var outgoingStreamQueue = StreamQueue(_outgoingController.stream);
while (await outgoingStreamQueue.hasNext) {
var peek = await outgoingStreamQueue.peek;
try {
send(peek);
await outgoingStreamQueue.next;
} catch (e) {
// Handle error and don't call `next`
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await outgoingStreamQueue.hasNext
Aha, I missed that - this looks sensible to me. I'll try and make it work Monday (as well as test on GitPod to confirm it actually solves the issue). Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code would burn CPU when an exception occurs (if we don't call next
, hasNext
will return true
and just sit in a tight loop). The existing code pauses the stream so it wouldn't enter the handler again.
A fix could be to await Future.delayed(const Duration(milliseconds: x))
though it feels a bit weird (and if the delay is too large, we introduce latency, and too low and we burn more CPU).
Can you think of a better way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with a wait period of 200ms for now and added a test for in-order messages. I think this all works, though I don't know how you feel about the 200ms retry (I thought about using exponential backoff, but we could easily end up with a large delay that would blow the keepAlive period and a simple check 5 times per second doesn't seem too wasteful for something we'd expect to occur within a number of seconds).
LMK what you think!
// If we're in a KeepAlive timeout, there's nowhere to send messages so
// wait a short period and check again.
if (isInKeepAlivePeriod) {
await Future.delayed(const Duration(milliseconds: 200));
continue;
}
lib/server/sse_handler.dart
Outdated
} | ||
|
||
// TODO(dantup): @visibleForTesting? | ||
void closeSink() => _sink.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a slightly better pattern to this but it will require some work.
Let's move this sse_handler.dart
under src
. Make this closeSink
a top level method that takes an SseConnection
. Now export everything under sse_handler.dart
but closeSink
.
In the test you can import from src
without concern and call closeSink
. Note you can also annotate that it is only visible for testing.
See this parallel example:
https://github.com/dart-lang/webdev/blob/2958ede70f20c402e869b4169f3d2f97162e51d4/dwds/lib/src/connections/debug_connection.dart#L50
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, nice! I've done that - and I committed the move as its own changeset to try and avoid Git just showing "this file deleted" and "this file added", however the overall diff here seems to have ignored that. This could cause conflicts if anyone else has outstanding changes in that file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we have any pending PRs so LGTM.
Thanks for the PR. Mostly LGTM. I think we can improve on some of the patterns. Let me know if you have any questions. |
lib/src/sse_handler.dart
Outdated
if (_keepAlive == null) { | ||
// Close immediately if we're not keeping alive. | ||
_close(); | ||
} else if (_keepAliveTimer == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can just be !_keepAliveTimer?.isActive ?? true
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This triggered a lint:
The value of the '?.' operator can be 'null', which isn't appropriate as an operand of a logical operator
Since we also checked this in SseConnection
, I made a bool get isInKeepAlivePeriod => _keepAliveTimer?.isActive ?? false;
to simplify the code a little.
lib/src/sse_handler.dart
Outdated
// If we got here then the sink may have closed but the stream.onDone | ||
// hasn't fired yet, so pause the subscription, re-queue the message | ||
// and handle the error as a disconnect. | ||
_handleDisconnect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you see my suggestion with using a StreamQueue here so that we can use peek
and next
to advance the queue only if we are successful? This way we don't have to re-queue the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I replied to the comment here -> #19 (comment) I wasn't sure how best to change to it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replied to the thread. Let me know if you have questions.
Curently fails with: ``` Expected: ['one', 'two'] Actual: ['two', 'one'] ```
The bot failure looks to be because in Dart 2.1.0 (the failing bot), adding to the closed stream did not throw an error. This was changed here (for Dart v2.1.2): dart-lang/sdk@bb560bb#diff-9a93a60cb56c68dbbe9ecc961fc5cad9 That means this reconnect behaviour will not work correctly for Dart < v2.1.2. I presume that's ok, but I'm not sure what to do about the bot? (FWIW I tried this out on GitPod, and while it solved the issue of dropped connections terminating the debug proxy, it's still not fully working (messages never arrive on the new connection)... I'll need to do some more debugging of that, so even if we're happy with this we might want to hold off merging until we understand whether it's related to this. |
Actually, it does work on GitPod! I had only set KeepAlive on the extension backend connection, however the injected client was also doing some cleanup when its sink closed (where I put the original logging), which was breaking things. I set them both to 30s KeepAlive (likely way longer than required) and I can still toggle BPs and see them in the event stream of the re-connected connections: :-) |
LGTM. We'll need to update the minimum SDK version given the closed logic change. |
Thanks! |
@grouma this is an attempt to fix #18 (may be easier to view the diff ignoring whitespace since some code got indenting and makes the diff look much bigger than it is).
However there is an exposed method here -
closeSink
that closes the underlying sink (in order to test - we can't close the exposedsink
because it closes the stream controller that needs to continue to be used). I'm not sure if there's a better way (we can add@visibleForTesting
, thoughmeta
isn't currently referenced here).Happy to make changes if this isn't what you had in mind (and I can test it end-to-end through dwds and GitPod to confirm it works prior to merging it).